package main

import (
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	// Use the env variable if running in the container, otherwise use the default.
	url := os.Getenv("NATS_URL")
	if url == "" {
		url = nats.DefaultURL
	}

	// Create an unauthenticated connection to NATS.
	nc, _ := nats.Connect(url)
	defer nc.Drain()

	// Access the JetStreamContext for managing streams and consumers
	// as well as for publishing and subscription convenience methods.
	js, _ := nc.JetStream()

	// Declare a simple [limits-based stream][1] and populate the stream
	// with a few messages.
	// [1]: /examples/jetstream/limits-stream/go/
	streamName := "EVENTS"

	js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{"events.>"},
	})

	// ### Durable (implicit)
	// Like the standard [push consumer][1], the `JetStreamContext` provides
	// a simple way to create an queue push consumer. The only additional
	// argument is the "group name". If `nats.Durable` is not passed, the
	// group name is used as the durable name as well.
	// [1]: /examples/jetstream/push-consumer/go/
	fmt.Println("# Durable (implicit)")
	sub1, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())

	// If we inspect the consumer info, we will notice a property that
	// was not defined for the non-queue push consumer. The `DeliverGroup`
	// is the unique name of the group of subscribers. Internally, this
	// corresponds to a core NATS queue group name which we will see
	// below.
	info, _ := js.ConsumerInfo(streamName, "event-processor")
	fmt.Printf("deliver group: %q\n", info.Config.DeliverGroup)

	// Using the same helper method, we can create another subscription
	// in the group. This method implicitly checks for whether the
	// consumer has been created and binds to it based on the subject
	// and group name.
	sub2, _ := js.QueueSubscribeSync("events.>", "event-processor", nats.AckExplicit())

	// As noted above, a queue push consumer relies on a core NATS
	// queue group for distributing messages to active members. As
	// a result, we can bind a subsription by using the `DeliverSubject`
	// and the `DeliverGroup`
	// Since messages are publishing to a dedicated subject and is
	// part of a group, we can also create a core NATS subscription
	// to join the group. As a reminder, the `DeliverSubject` is
	// randomly generated by default, but this can be set explicitly
	// in the consumer config if desired.
	sub3, _ := nc.QueueSubscribeSync(info.Config.DeliverSubject, info.Config.DeliverGroup)
	fmt.Printf("deliver subject: %q\n", info.Config.DeliverSubject)

	// Now we can publish some messages to the stream to observe how
	// they are distributed to the subscribers.
	js.Publish("events.1", nil)
	js.Publish("events.2", nil)
	js.Publish("events.3", nil)
	js.Publish("events.4", nil)
	js.Publish("events.5", nil)
	js.Publish("events.6", nil)

	// As noted in the [push consumer][1] example, subscriptions
	// enqueue messages proactively. When there are a group of
	// subscriptions, each will receive a different subset of the messages.
	// When calling `NextMsg` this means, messages can be processed out
	// of order. There is no correlation with message order and
	// subscription creation order 😉. In fact, it is possible that
	// not all subscriptions will necessarily get a message.
	// [1]: /examples/jetstream/push-consumer/go/
	msg, _ := sub1.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub1: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub1: receive timeout")
	}

	msg, _ = sub2.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub2: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub2: receive timeout")
	}

	msg, _ = sub3.NextMsg(time.Second)
	if msg != nil {
		fmt.Printf("sub3: received message %q\n", msg.Subject)
		msg.Ack()
	} else {
		fmt.Println("sub3: receive timeout")
	}

	// Since we created this consumer using the helper method, when we unsubscribe
	// (or call `Drain`), the consumer will be deleted.
	sub1.Unsubscribe()
	sub2.Unsubscribe()
	sub3.Unsubscribe()

	// ### Durable (explicit)
	// To create a (safe) durable consumer, use the `AddConsumer`
	// method. Although it may seem redundant, a durable name *and*
	// the deliver group name must be defined. This is simply because
	// the durable name is used for all consumer types, while the
	// deliver group is exclusive to the queue push consumer. In this
	// example, the same name is used as convention which is what the helper
	// method above did as well.
	fmt.Println("\n# Durable (explicit)")

	js.AddConsumer(streamName, &nats.ConsumerConfig{
		Durable:        "event-processor",
		DeliverSubject: "my-subject",
		DeliverGroup:   "event-processor",
		AckPolicy:      nats.AckExplicitPolicy,
	})
	defer js.DeleteConsumer(streamName, "event-processor")

	// Wait for all 6 messages to be received before exiting.
	wg := &sync.WaitGroup{}
	wg.Add(6)

	// For this part, we will use async core NATS queue subscriptions. Since
	// core NATS subscriptions are JetStream-unaware, we must call `msg.Ack`
	// explicitly to notify the server that the message has been processed.
	// For a `js.QueueSubscribe` handler, by default, messages will be auto-acked
	// unless the `nats.ManualAck` subscription option is supplied.
	sub1, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub1: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})
	sub2, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub2: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})
	sub3, _ = nc.QueueSubscribe("my-subject", "event-processor", func(msg *nats.Msg) {
		fmt.Printf("sub3: received message %q\n", msg.Subject)
		msg.Ack()
		wg.Done()
	})

	wg.Wait()
}
